RDD 常用算子

Note

从是否触发计算来说,RDD 算子可分为 Transformations 类算子和 Actions 类算子。
从算子用途来说,RDD 有数据转换、数据聚合、数据持久化等类型的算子。

算子类型

适用范围

算子用途

算子集合

Transformations

任意RDD

RDD内数据转换

map
mapPartitions
flatMap
filter

Paired RDD

RDD内数据聚合

groupByKey
reduceByKey
aggregateByKey
sortByKey
sortBy

任意RDD

数据整合

union
sample

任意RDD

重分布

coalesce
repartition

Actions

任意RDD

数据收集

collect
first
take

任意RDD

数据持久化

saveAsTextFile

RDD内数据转换

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("rdd operator").setMaster("local")
sc = SparkContext(conf=conf)

# 第一节的代码
lineRDD = sc.textFile("../data/wikiOfSpark.txt")
# flatMap: 先从元素到集合、再从集合到元素
wordRDD = lineRDD.flatMap(lambda line: line.split(" "))
# 调用 filter(f),其作用是保留 RDD 中 f 返回 True 的数据元素,过滤其它元素。
cleanWordRDD = wordRDD.filter(lambda word: word != "")

map

给定映射函数 f,map(f) 以元素为粒度对 RDD 做数据转换。
f 可以是带名函数也可以是匿名函数,效果都是一样的。

def f(word):
    return (word, 1)

# 带名函数
kvRDD = cleanWordRDD.map(f)
# 匿名函数,效果同上
kvRDD = cleanWordRDD.map(lambda word: (word, 1))

mapPartitions

以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换。
可以使用 mapPartitions 来改善执行性能。

from hashlib import md5

def f(partition):
    """在处理每一条数据记录的时候,可用复用同一个Partition内的md5对象"""
    m = md5()
    for word in partition:
        m.update(word.encode())
        yield m.hexdigest()
        
# 先加密 word,再做转化
kvMD5RDD = cleanWordRDD.mapPartitions(f).map(lambda word: (word, 1))
print(kvMD5RDD.take(2))
[('e9713ae04a02a810d6f33dd956f42794', 1), ('032d2d5e07dd65f436bf59e8135822d2', 1)]

RDD内数据聚合

接下来要介绍5个聚合算子,在它们计算的过程中,都会引入 Shuffle。

groupByKey

groupByKey 的功能是对 Key 值相同的元素做分组,然后把相应的 Value 值,以集合的形式收集到一起。

# Key 和 Value 都变为单词
kvSameRDD = cleanWordRDD.map(lambda word: (word, word))
# [(Spark, (Spark, Spark, Spark)), (Streaming, (Streaming, Streaming))] 这样的数据
words = kvSameRDD.groupByKey()

reduceByKey

分组聚合

# 随机的value
kvRandomRDD = cleanWordRDD.map(lambda word: (word, random.randint(0, 100)))
# 聚合函数: 同组内最大的value
wordCounts = kvRandomRDD.reduceByKey(lambda x, y: max(x, y))

aggregateByKey

reduceByKey: Map 端聚合函数和 Reduce 端聚合函数都一样。
aggregateByKey: 分别指定 Map 端聚合函数和 Reduce 端聚合函数。

def f1(x, y):
    # 定义 Map 阶段聚合函数
    return x + y

def f2(x, y):
    # 定义 Reduce 阶段聚合函数
    return max(x, y)

# 初始值(需与f2结果类型保持一致),Map 函数,Reduce 函数
wordCounts = kvRDD.aggregateByKey(0, f1, f2)

sortByKey 和 sortBy

以 Key 为准对 RDD 做排序。

print(wordCounts.sortByKey(ascending=False).take(5))
[('|conference=', 2), ('|', 3), ('your', 1), ('you', 3), ('years', 1)]
print(wordCounts.sortBy(lambda x: x[1], ascending=False).take(5))
[('the', 67), ('Spark', 63), ('a', 54), ('and', 51), ('of', 50)]

数据整合

words1 = ["Spark", "is", "cool"]
words2 = ["what", "is", "Apache"]
rdd1 = sc.parallelize(words1)
rdd2 = sc.parallelize(words2)

# union: 合并两个同类型RDD
rdd = rdd1.union(rdd2)
rdd = sc.parallelize(list(range(100)))
# sample: 对RDD做随机采样
# 采样是否有放回,采样比例,随机数种子(可选)
print(rdd.sample(False, 0.1, 123).take(10))
[5, 7, 9, 13, 16, 39, 48, 53, 55, 71]

重分布

# 查看分区数量
print(rdd.getNumPartitions())

# repartition: 调整RDD的并行度(即RDD的数据分区数量),可增可降
rdd1 = rdd.repartition(20)
print(rdd1.getNumPartitions())

# coalesce: 降低RDD的并行度,不会触发shuffle
rdd2 = rdd1.coalesce(5)
print(rdd2.getNumPartitions())
1
20
5

数据收集

take 我们已经很熟悉了,即收集数个元素。
collect: 收集所有元素。
first: 收集所有元素。

print(rdd.collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
print(rdd.first())
0

数据持久化

import os

path = "../data/persist"
if not os.path.exists(path=path):
    cleanWordRDD.saveAsTextFile(path=path)